我們在這幾篇文章中時不時的會提到 RxJava 可以跟哪個 library 搭配使用,究竟是怎樣厲害的工具會讓像 Retrofit、Room 等 library 都列入支援的重點呢?
Rx 其實是 ReactiveX 的縮寫,RxJava 是 ReactiveX 在 Java 的一種實現,那 ReactiveX 又代表什麼呢?
如果我們說 Java 是 object oriented programming,所有的一切都是以物件為出發點,那 kotlin 就是 functional programming,所有一切都是由 function 組成,可以把 function 丟來丟去,那 ReactiveX 就是以資料為中心,所有的關注點都是在資料的流動上。
那大家覺得
C
這個語言是以什麼為中心呢?筆者認為應該是 computer 吧!XD
用流動這個詞來形容是有特別用意的,原始資料透過一連串 function 的串接來定義資料的不同階段,我們可以在每個階段對資料做處理然後讓新的資料流到下個階段,同時也可以指定資料流在哪個階段要跑在哪個 Thread 上,讓 async 的工作變成一連串的 function call,我們也就不再需要面對 callback hell。
還記得我們之前提到的 Hollywood Principle 嗎?他也有另一個名字叫做觀察者模式,首先有個被觀察者(Observable
)跟觀察者(Observer
)二種角色,觀察者可以使用 subscribe
跟被觀察者註冊事件,當 Observable
有新資料的時候,就會把資料當成事件發射出來。
不意外的,我們又必須先加上 RxJava 的 dependency:
dependencies {
implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
}
這邊是一個小範例,雖然我們還不了解為什麼要這樣寫,但可以看一下結構感受一下:
Observable
.just(1, 2, 3)
.subscribe(object : Observer<Int> {
override fun onNext(i: Int) {
println(i)
}
override fun onComplete() {
println("Completed Observable.")
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
println("Whoops: " + e.message)
}
})
透過 just(1, 2, 3)
,我們獲得了一個 Observable
,然後建立一個 Observer
來 subscribe
這個 Observable
的事件。
Observer
會透過 onNext
依序收到 Observable
的資料,最後有一個 onComplete
通知整個資料流結束,這邊是他的輸出:
2019-10-01 07:53:30.267 15049-15049/com.example.myapplication I/System.out: 1
2019-10-01 07:53:30.267 15049-15049/com.example.myapplication I/System.out: 2
2019-10-01 07:53:30.267 15049-15049/com.example.myapplication I/System.out: 3
2019-10-01 07:53:30.267 15049-15049/com.example.myapplication I/System.out: Completed Observable.
大家可能會好奇要拿到一個陣列的所有元素,只要用一個 for loop 就可以解決了,為什麼要搞的這麼麻煩呢?
for (i in listOf(1, 2, 3)) {
println(i)
}
一個很重要的差別是主被動的關係,當我們使用 for loop 時,我們是主動的去瀏覽資料,所以當我們迴圈結束後,我們跟資料的關係就結束。
而 Rx 的概念是建立 Observable
與 Observer
這種資料流的關係,Observer
是屬於被動收到資料的角色,只要這個關係一直存在,有任何新資料都可以繼續透過這種連結通知 Observer
,也因為把資料流通常只有一個維度、一個方向,我們的程式碼會擁有比較單純的結構。
我們以上節的例子為基礎,再來介紹什麼是 operator。
假設我們今天只想要奇數的數字,然後輸出的時候要把它乘以 2 ,這邊是用 for loop 的寫法:
for (i in listOf(1, 2, 3)) {
if (i % 2 != 1) {
continue
}
println(i * 2)
}
很正確也很簡潔,但我們所新增的邏輯就像狗皮膏藥一樣貼在程式碼的各個地方,當程式碼一長的時候,我們可能就不一定能夠很快的理解,而 RxJava 可以怎麼寫呢?
Observable
.just(1, 2, 3)
.filter { it % 2 == 1 }
.map { it * 2 }
.subscribe(object : Observer<Int> {
override fun onNext(i: Int) {
println(i)
}
override fun onComplete() {
println("Completed Observable.")
}
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
println("Whoops: " + e.message)
}
})
大家可以看到我們只有在資料流的中間,加上 filter
跟 map
這二個 operator,就可以解決我們的需求,是不是更簡潔呢?
更多Operator:
http://reactivex.io/documentation/operators.html
這時候就要更了解 operator 到底是什麼,我們發現不論是 filter
或是 map
,都是 Observable
裡面的 function,但這個 function 會把原有的 Observable
跟 operator 裡的程式碼融合成另外一個 Observable
,所以不管有多少層的串接,最後的結果還是一個 Observable
。
public final Observable<T> filter(Predicate<? super T> predicate) {
//...
}
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//...
}
雖然都是 Observable,但有時型別會被轉換,像上面的
map
如果我們的 function 是寫.map { it.toString() }
這樣出來的Observable
就會是Observable<String>
而不是原有的Observable<Int>
,這個地方要特別小心喔!
以上就是今天的內容了,我們明天會繼續談怎麼 在 Rx 裡切換 Thread,敬請期待!
Android 十全大補已經正式出書上架囉!
有興趣的讀者歡迎參考:
https://www.tenlong.com.tw/products/9789864345786